/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.runtime.scheduler;

import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.util.DualKeyLinkedMap;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * Shared slot implementation for the {@link SlotSharingExecutionSlotAllocator}.
 *
 * <p>The shared slots are owned and tracked by {@link SlotSharingExecutionSlotAllocator}.
 * The shared slot represents a collection of {@link SingleLogicalSlot} requests which share one physical slot.
 * The shared slot is created by the {@link SlotSharingExecutionSlotAllocator} from the physical slot request.
 * Afterwards, {@link SlotSharingExecutionSlotAllocator} requests logical slots from the underlying physical slot
 * for {@link Execution executions} which share it.
 *
 * <p>The shared slot becomes a {@link PhysicalSlot.Payload} of its underlying physical slot
 * once the physical slot is obtained. If the allcoated physical slot gets released then it calls back the shared slot
 * to release the logical slots which fail their execution payloads.
 *
 * <p>A logical slot request can be cancelled if it is not completed yet or returned by the execution
 * if it has been completed and given to the execution by {@link SlotSharingExecutionSlotAllocator}.
 * If the underlying physical slot fails, it fails all logical slot requests.
 * The failed, cancelled or returned logical slot requests are removed from the shared slot.
 * Once the shared slot has no registered logical slot requests, it calls back its {@link SlotSharingExecutionSlotAllocator}
 * to remove it from the allocator and cancel its underlying physical slot request if the request is not fulfilled yet.
 */
class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
	private static final Logger LOG = LoggerFactory.getLogger(SharedSlot.class);

	private final SlotRequestId physicalSlotRequestId;

	private final ResourceProfile physicalSlotResourceProfile;

	private final ExecutionSlotSharingGroup executionSlotSharingGroup;

	private final CompletableFuture<PhysicalSlot> slotContextFuture;

	private final DualKeyLinkedMap<ExecutionVertexID, SlotRequestId, CompletableFuture<SingleLogicalSlot>> requestedLogicalSlots;

	private final boolean slotWillBeOccupiedIndefinitely;

	private final Consumer<ExecutionSlotSharingGroup> externalReleaseCallback;

	private State state;

	SharedSlot(
			SlotRequestId physicalSlotRequestId,
			ResourceProfile physicalSlotResourceProfile,
			ExecutionSlotSharingGroup executionSlotSharingGroup,
			CompletableFuture<PhysicalSlot> slotContextFuture,
			boolean slotWillBeOccupiedIndefinitely,
			Consumer<ExecutionSlotSharingGroup> externalReleaseCallback) {
		this.physicalSlotRequestId = physicalSlotRequestId;
		this.physicalSlotResourceProfile = physicalSlotResourceProfile;
		this.executionSlotSharingGroup = executionSlotSharingGroup;
		this.slotContextFuture = slotContextFuture.thenApply(physicalSlot -> {
			Preconditions.checkState(
				physicalSlot.tryAssignPayload(this),
				"Unexpected physical slot payload assignment failure!");
			return physicalSlot;
		});
		this.requestedLogicalSlots = new DualKeyLinkedMap<>(executionSlotSharingGroup.getExecutionVertexIds().size());
		this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
		this.externalReleaseCallback = externalReleaseCallback;
		this.state = State.ALLOCATED;
	}

	SlotRequestId getPhysicalSlotRequestId() {
		return physicalSlotRequestId;
	}

	ResourceProfile getPhysicalSlotResourceProfile() {
		return physicalSlotResourceProfile;
	}

	CompletableFuture<PhysicalSlot> getSlotContextFuture() {
		return slotContextFuture;
	}

	/**
	 * Registers an allocation request for a logical slot.
	 *
	 * <p>The logical slot request is complete once the underlying physical slot request is complete.
	 *
	 * @param executionVertexId {@link ExecutionVertexID} of the execution for which to allocate the logical slot
	 * @return the logical slot future
	 */
	CompletableFuture<LogicalSlot> allocateLogicalSlot(ExecutionVertexID executionVertexId) {
		Preconditions.checkState(state == State.ALLOCATED, "SharedSlot (physical request %s) has been released", physicalSlotRequestId);
		Preconditions.checkArgument(
			executionSlotSharingGroup.getExecutionVertexIds().contains(executionVertexId),
			"Trying to allocate a logical slot for execution %s which is not in the ExecutionSlotSharingGroup",
			executionVertexId);
		CompletableFuture<SingleLogicalSlot> logicalSlotFuture = requestedLogicalSlots.getValueByKeyA(executionVertexId);
		if (logicalSlotFuture != null) {
			LOG.debug("Request for {} already exists", getLogicalSlotString(executionVertexId));
		} else {
			logicalSlotFuture = allocateNonExistentLogicalSlot(executionVertexId);
		}
		return logicalSlotFuture.thenApply(Function.identity());
	}

	private CompletableFuture<SingleLogicalSlot> allocateNonExistentLogicalSlot(ExecutionVertexID executionVertexId) {
		CompletableFuture<SingleLogicalSlot> logicalSlotFuture;
		SlotRequestId logicalSlotRequestId = new SlotRequestId();
		String logMessageBase = getLogicalSlotString(logicalSlotRequestId, executionVertexId);
		LOG.debug("Request a {}", logMessageBase);

		logicalSlotFuture = slotContextFuture
			.thenApply(physicalSlot -> {
				LOG.debug("Allocated {}", logMessageBase);
				return createLogicalSlot(physicalSlot, logicalSlotRequestId);
			});
		requestedLogicalSlots.put(executionVertexId, logicalSlotRequestId, logicalSlotFuture);

		// If the physical slot request fails (slotContextFuture), it will also fail the logicalSlotFuture.
		// Therefore, the next `exceptionally` callback will call removeLogicalSlotRequest and do the cleanup
		// in requestedLogicalSlots and eventually in sharedSlots
		logicalSlotFuture.exceptionally(cause -> {
			LOG.debug("Failed {}", logMessageBase, cause);
			removeLogicalSlotRequest(logicalSlotRequestId);
			return null;
		});
		return logicalSlotFuture;
	}

	private SingleLogicalSlot createLogicalSlot(PhysicalSlot physicalSlot, SlotRequestId logicalSlotRequestId) {
		return new SingleLogicalSlot(
			logicalSlotRequestId,
			physicalSlot,
			null,
			Locality.UNKNOWN,
			this,
			slotWillBeOccupiedIndefinitely);
	}

	/**
	 * Cancels a logical slot request.
	 *
	 * <p>If the logical slot request is already complete, nothing happens because the logical slot is already given
	 * to the execution and it the responsibility of the execution to call {@link #returnLogicalSlot(LogicalSlot)}.
	 *
	 * <p>If the logical slot request is not complete yet, its future gets cancelled or failed.
	 *
	 * @param executionVertexID {@link ExecutionVertexID} of the execution for which to cancel the logical slot
	 * @param cause the reason of cancellation or null if it is not available
	 */
	void cancelLogicalSlotRequest(ExecutionVertexID executionVertexID, @Nullable Throwable cause) {
		Preconditions.checkState(state == State.ALLOCATED, "SharedSlot (physical request %s) has been released", physicalSlotRequestId);
		CompletableFuture<SingleLogicalSlot> logicalSlotFuture = requestedLogicalSlots.getValueByKeyA(executionVertexID);
		SlotRequestId logicalSlotRequestId = requestedLogicalSlots.getKeyBByKeyA(executionVertexID);
		if (logicalSlotFuture != null) {
			LOG.debug("Cancel {} from {}", getLogicalSlotString(logicalSlotRequestId), executionVertexID);
			// If the logicalSlotFuture was not completed and now it fails, the exceptionally callback will also call removeLogicalSlotRequest
			if (cause == null) {
				logicalSlotFuture.cancel(false);
			} else {
				logicalSlotFuture.completeExceptionally(cause);
			}
		} else {
			LOG.debug("No SlotExecutionVertexAssignment for logical {} from physical {}}", logicalSlotRequestId, physicalSlotRequestId);
		}
	}

	@Override
	public void returnLogicalSlot(LogicalSlot logicalSlot) {
		removeLogicalSlotRequest(logicalSlot.getSlotRequestId());
	}

	private void removeLogicalSlotRequest(SlotRequestId logicalSlotRequestId) {
		LOG.debug("Remove {}", getLogicalSlotString(logicalSlotRequestId));
		Preconditions.checkState(
			requestedLogicalSlots.removeKeyB(logicalSlotRequestId) != null,
			"Trying to remove a logical slot request which has been either already removed or never created.");
		releaseExternally();
	}

	@Override
	public void release(Throwable cause) {
		Preconditions.checkState(
			slotContextFuture.isDone(),
			"Releasing of the shared slot is expected only from its successfully allocated physical slot ({})",
			physicalSlotRequestId);
		LOG.debug("Release shared slot ({})", physicalSlotRequestId);

		// copy the logical slot collection to avoid ConcurrentModificationException
		// if logical slot releases cause cancellation of other executions
		// which will try to call returnLogicalSlot and modify requestedLogicalSlots collection
		Map<ExecutionVertexID, CompletableFuture<SingleLogicalSlot>> logicalSlotFutures = requestedLogicalSlots
			.keySetA()
			.stream()
			.collect(Collectors.toMap(executionVertexId -> executionVertexId, requestedLogicalSlots::getValueByKeyA));
		for (Map.Entry<ExecutionVertexID, CompletableFuture<SingleLogicalSlot>> entry : logicalSlotFutures.entrySet()) {
			LOG.debug("Release {}", getLogicalSlotString(entry.getKey()));
			CompletableFuture<SingleLogicalSlot> logicalSlotFuture =
				entry.getValue();
			Preconditions.checkNotNull(logicalSlotFuture);
			Preconditions.checkState(
				logicalSlotFuture.isDone(),
				"Logical slot future must already done when release call comes from the successfully allocated physical slot ({})",
				physicalSlotRequestId);
			logicalSlotFuture.thenAccept(logicalSlot -> logicalSlot.release(cause));
		}
		requestedLogicalSlots.clear();
		releaseExternally();
	}

	private void releaseExternally() {
		if (state != State.RELEASED && requestedLogicalSlots.values().isEmpty()) {
			state = State.RELEASED;
			LOG.debug("Release shared slot externally ({})", physicalSlotRequestId);
			externalReleaseCallback.accept(executionSlotSharingGroup);
		}
	}

	@Override
	public boolean willOccupySlotIndefinitely() {
		return slotWillBeOccupiedIndefinitely;
	}

	private String getLogicalSlotString(SlotRequestId logicalSlotRequestId) {
		return getLogicalSlotString(logicalSlotRequestId, requestedLogicalSlots.getKeyAByKeyB(logicalSlotRequestId));
	}

	private String getLogicalSlotString(ExecutionVertexID executionVertexId) {
		return getLogicalSlotString(requestedLogicalSlots.getKeyBByKeyA(executionVertexId), executionVertexId);
	}

	private String getLogicalSlotString(SlotRequestId logicalSlotRequestId, ExecutionVertexID executionVertexId) {
		return String.format(
			"logical slot (%s) for execution vertex (id %s) from the physical slot (%s)",
			logicalSlotRequestId,
			executionVertexId,
			physicalSlotRequestId);
	}

	/**
	 * Returns whether the shared slot has no assigned logical slot requests.
	 */
	boolean isEmpty() {
		return requestedLogicalSlots.size() == 0;
	}

	private enum State {
		ALLOCATED,
		RELEASED
	}
}
